package com.jie.flink.cdc.mycdc;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.types.Row;

/**
 * @author alanchan

DROP TABLE IF EXISTS emp;
CREATE TABLE emp (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(255) ,
`age` INTEGER ,
`gender` INTEGER,
`address` VARCHAR(512)
);

 */
public class TestJdbcDemoSix {

	/**
	 *
	 * SHOW VARIABLES LIKE '%time_zone%'
	 *
	 * SET GLOBAL time_zone = '+8:00';		-- 以后每次连接mysql都是东八区
	 * SET time_zone = '+8:00';			-- 只有此次连接mysql是东八区，下次连接mysql又回到系统默认时区
	 *
	 * @param args
	 * @throws DatabaseNotExistException
	 * @throws CatalogException
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		/*

		 */

		String createTableSql = "CREATE TABLE b (\n" +
				"  dt date,\n" +
				"  user_id bigint,\n" +
				"  member_level int,\n" +
				"  user_name string\n" +
				") WITH (\n" +
				"   'connector' = 'jdbc',\n" +
				"   'url' = 'jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai',\n" +
				"   'username' = 'root',\n" +
				"   'password' = 'root',\n" +
				"   'table-name' = 'dim_user_info'\n" +
				");";

		tenv.executeSql(createTableSql);

		String createTableSqlaa = "CREATE TABLE aa (\n" +
				"    event_id varchar(8),\n" +
				"    page_id  varchar(2),\n" +
				"    action_time  bigint,\n" +
				"    user_id  int\n" +
				") WITH (\n" +
				"   'connector' = 'jdbc',\n" +
				"   'url' = 'jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai',\n" +
				"   'username' = 'root',\n" +
				"   'password' = 'root',\n" +
				"   'table-name' = 'a'\n" +
				")";

		tenv.executeSql(createTableSqlaa);

		String mysql = "select * from aa";
		Table table = tenv.sqlQuery(mysql);
		DataStream<Row> rowDataStream = tenv.toChangelogStream(table);
		tenv.createTemporaryView("a",rowDataStream, Schema.newBuilder()
				.column("event_id", DataTypes.STRING())
				.column("page_id", DataTypes.STRING())
				.column("action_time", DataTypes.BIGINT())
				.column("user_id", DataTypes.INT())
				.columnByExpression("pt", "proctime()")
				.build());


/*		String sql = "SELECT a.*,b.* FROM a left join b for system_time as of a.pt \n" +
				" on a.user_id=b.user_id and b.dt=TO_DATE('2024-01-16') ";*/
/*		String sql = "explain SELECT a.*,b.* FROM a left join b for system_time as of a.pt \n" +
				" on a.user_id=b.user_id and b.dt=TO_DATE('2024-01-17') "; *///查看执行计划

		String sql = " SELECT a.*,b.* FROM a left join b for system_time as of a.pt \n" +
				" on a.user_id=b.user_id  and b.dt=TO_DATE('2024-01-17')";

		tenv.executeSql(sql).print();

/*		Table table = tenv.sqlQuery(sql);
		table.printSchema();
		tenv.toChangelogStream(table).print();*/
//		env.execute();
	}

}
